過去我們針對資料做處理,整理,清洗等,我們這次針對 sql 相關主題進行操作
今天我們針對 sql schema 進行管理。 我們將使用 Rust 打造一個類似 Flyway 或 Liquibase 的資料庫遷移工具,
幫助開發者自動化管理資料庫 schema 版本,當團隊規模擴大、專案複雜度提升時,
手動管理資料庫變更不僅容易出錯,也難以追蹤歷史變更
sqlx
進行資料庫操作cargo new db-migrator
cd db-migrator
Cargo.toml
[dependencies]
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "postgres", "sqlite"] }
tokio = { version = "1.0", features = ["full"] }
clap = { version = "4.0", features = ["derive"] }
chrono = "0.4"
anyhow = "1.0"
regex = "1.0"
sha2 = "0.10"
V{version}__{description}.sql
ex :
V001__create_users_table.sql
V002__add_email_index.sql
V003__create_posts_table.sql
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(50) PRIMARY KEY,
description TEXT NOT NULL,
checksum VARCHAR(64) NOT NULL,
executed_at TIMESTAMP NOT NULL,
execution_time_ms INTEGER NOT NULL,
success BOOLEAN NOT NULL
);
Migration 定義結構
use chrono::{DateTime, Utc};
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct Migration {
pub version: String,
pub description: String,
pub file_path: PathBuf,
pub sql_content: String,
pub checksum: String,
}
#[derive(Debug, Clone)]
pub struct MigrationRecord {
pub version: String,
pub description: String,
pub checksum: String,
pub executed_at: DateTime<Utc>,
pub execution_time_ms: i32,
pub success: bool,
}
impl Migration {
pub fn from_file(path: PathBuf) -> anyhow::Result<Self> {
let filename = path
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| anyhow::anyhow!("Invalid filename"))?;
// 解析檔名: V001__create_users_table.sql
let re = regex::Regex::new(r"^V(\d+)__(.+)\.sql$")?;
let caps = re
.captures(filename)
.ok_or_else(|| anyhow::anyhow!("Invalid migration filename format: {}", filename))?;
let version = caps[1].to_string();
let description = caps[2].replace('_', " ");
let sql_content = std::fs::read_to_string(&path)?;
let checksum = Self::calculate_checksum(&sql_content);
Ok(Migration {
version,
description,
file_path: path,
sql_content,
checksum,
})
}
fn calculate_checksum(content: &str) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
}
}
這裡我們製作 Migrator 核心邏輯
use sqlx::{Pool, Postgres, Row};
use std::path::Path;
use std::time::Instant;
pub struct Migrator {
pool: Pool<Postgres>,
migrations_dir: PathBuf,
}
impl Migrator {
pub async fn new(database_url: &str, migrations_dir: PathBuf) -> anyhow::Result<Self> {
let pool = Pool::<Postgres>::connect(database_url).await?;
Ok(Migrator {
pool,
migrations_dir,
})
}
pub async fn init(&self) -> anyhow::Result<()> {
println!("🔧 Initializing schema_migrations table...");
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(50) PRIMARY KEY,
description TEXT NOT NULL,
checksum VARCHAR(64) NOT NULL,
executed_at TIMESTAMP NOT NULL,
execution_time_ms INTEGER NOT NULL,
success BOOLEAN NOT NULL
)
"#,
)
.execute(&self.pool)
.await?;
println!("✅ Schema migrations table ready");
Ok(())
}
pub async fn load_migrations(&self) -> anyhow::Result<Vec<Migration>> {
let mut migrations = Vec::new();
let entries = std::fs::read_dir(&self.migrations_dir)?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("sql") {
match Migration::from_file(path.clone()) {
Ok(migration) => migrations.push(migration),
Err(e) => {
eprintln!("⚠️ Failed to parse migration {:?}: {}", path, e);
}
}
}
}
// 按版本號排序
migrations.sort_by(|a, b| {
a.version
.parse::<i32>()
.unwrap_or(0)
.cmp(&b.version.parse::<i32>().unwrap_or(0))
});
Ok(migrations)
}
pub async fn get_applied_migrations(&self) -> anyhow::Result<Vec<MigrationRecord>> {
let records = sqlx::query_as::<_, MigrationRecord>(
"SELECT version, description, checksum, executed_at, execution_time_ms, success
FROM schema_migrations
ORDER BY version"
)
.fetch_all(&self.pool)
.await?;
Ok(records)
}
pub async fn migrate(&self) -> anyhow::Result<()> {
println!("🚀 Starting database migration...\n");
let migrations = self.load_migrations().await?;
let applied = self.get_applied_migrations().await?;
let applied_versions: std::collections::HashSet<_> =
applied.iter().map(|r| r.version.clone()).collect();
let pending: Vec<_> = migrations
.iter()
.filter(|m| !applied_versions.contains(&m.version))
.collect();
if pending.is_empty() {
println!("✅ Database is up to date. No pending migrations.");
return Ok(());
}
println!("📋 Found {} pending migration(s):\n", pending.len());
for migration in &pending {
println!(" V{} - {}", migration.version, migration.description);
}
println!();
for migration in pending {
self.apply_migration(migration).await?;
}
println!("\n🎉 All migrations completed successfully!");
Ok(())
}
async fn apply_migration(&self, migration: &Migration) -> anyhow::Result<()> {
println!("⏳ Applying V{} - {}...", migration.version, migration.description);
let start = Instant::now();
let mut tx = self.pool.begin().await?;
match sqlx::query(&migration.sql_content).execute(&mut *tx).await {
Ok(_) => {
let duration = start.elapsed().as_millis() as i32;
// 記錄成功的遷移
sqlx::query(
r#"
INSERT INTO schema_migrations
(version, description, checksum, executed_at, execution_time_ms, success)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
)
.bind(&migration.version)
.bind(&migration.description)
.bind(&migration.checksum)
.bind(Utc::now())
.bind(duration)
.bind(true)
.execute(&mut *tx)
.await?;
tx.commit().await?;
println!("✅ Applied V{} in {}ms", migration.version, duration);
Ok(())
}
Err(e) => {
tx.rollback().await?;
println!("❌ Failed to apply V{}: {}", migration.version, e);
Err(e.into())
}
}
}
pub async fn status(&self) -> anyhow::Result<()> {
println!("📊 Migration Status\n");
let migrations = self.load_migrations().await?;
let applied = self.get_applied_migrations().await?;
let applied_map: std::collections::HashMap<_, _> =
applied.iter().map(|r| (r.version.clone(), r)).collect();
println!("{:<10} {:<40} {:<15} {:<20}",
"Version", "Description", "Status", "Executed At");
println!("{}", "-".repeat(85));
for migration in migrations {
if let Some(record) = applied_map.get(&migration.version) {
let status = if record.success { "✅ Applied" } else { "❌ Failed" };
println!(
"{:<10} {:<40} {:<15} {:<20}",
format!("V{}", migration.version),
&migration.description[..migration.description.len().min(38)],
status,
record.executed_at.format("%Y-%m-%d %H:%M:%S")
);
} else {
println!(
"{:<10} {:<40} {:<15} {:<20}",
format!("V{}", migration.version),
&migration.description[..migration.description.len().min(38)],
"⏳ Pending",
"-"
);
}
}
Ok(())
}
}
// 實作 sqlx::FromRow
impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for MigrationRecord {
fn from_row(row: &sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
Ok(MigrationRecord {
version: row.try_get("version")?,
description: row.try_get("description")?,
checksum: row.try_get("checksum")?,
executed_at: row.try_get("executed_at")?,
execution_time_ms: row.try_get("execution_time_ms")?,
success: row.try_get("success")?,
})
}
}
cli (main.rs)
use clap::{Parser, Subcommand};
#[derive(Parser)]
#[command(name = "db-migrator")]
#[command(about = "Database migration tool for managing schema versions")]
struct Cli {
#[command(subcommand)]
command: Commands,
#[arg(short, long, default_value = "postgres://localhost/mydb")]
database_url: String,
#[arg(short, long, default_value = "./migrations")]
migrations_dir: String,
}
#[derive(Subcommand)]
enum Commands {
/// Initialize the schema_migrations table
Init,
/// Apply all pending migrations
Migrate,
/// Show migration status
Status,
/// Create a new migration file
Create {
/// Description of the migration
description: String,
},
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let migrations_dir = PathBuf::from(&cli.migrations_dir);
// 確保 migrations 目錄存在
std::fs::create_dir_all(&migrations_dir)?;
match cli.command {
Commands::Init => {
let migrator = Migrator::new(&cli.database_url, migrations_dir).await?;
migrator.init().await?;
}
Commands::Migrate => {
let migrator = Migrator::new(&cli.database_url, migrations_dir).await?;
migrator.init().await?;
migrator.migrate().await?;
}
Commands::Status => {
let migrator = Migrator::new(&cli.database_url, migrations_dir).await?;
migrator.status().await?;
}
Commands::Create { description } => {
create_migration(&migrations_dir, &description)?;
}
}
Ok(())
}
fn create_migration(dir: &Path, description: &str) -> anyhow::Result<()> {
let existing = std::fs::read_dir(dir)?
.filter_map(|e| e.ok())
.filter_map(|e| {
let filename = e.file_name().to_string_lossy().to_string();
if filename.starts_with('V') && filename.ends_with(".sql") {
filename[1..4].parse::<i32>().ok()
} else {
None
}
})
.max()
.unwrap_or(0);
let next_version = existing + 1;
let sanitized_desc = description.replace(' ', "_").to_lowercase();
let filename = format!("V{:03}__{}.sql", next_version, sanitized_desc);
let filepath = dir.join(&filename);
let template = format!(
"-- Migration: {}\n-- Created: {}\n\n-- Add your SQL here\n\n",
description,
Utc::now().format("%Y-%m-%d %H:%M:%S")
);
std::fs::write(&filepath, template)?;
println!("✅ Created migration: {}", filename);
println!("📝 Edit file: {}", filepath.display());
Ok(())
}
打完收工!
step 1 : 這裏初始化
cargo run -- init --database-url postgres://user:pass@localhost/mydb
step 2 : 建立新的 migration
cargo run -- create "create users table"
這會生成 migrations/V001__create_users_table.sql
-- Migration: create users table
-- Created: 2025-10-04 10:30:00
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
step3 :執行 migrate
cargo run -- migrate
step4 : 查看狀態
cargo run -- status